#include "config.h"

#include <sys/types.h>
#include <sys/wait.h>
#include <sys/un.h>
#include <errno.h>
#include <pthread.h>
#include <getopt.h>

#define DBG_SUBSYS S_LIBINTERFACE

#include <pthread.h>

#include "cliserv.h"
#include "nbd.h"
#include "stor_root.h"
#include "core.h"
#include "corenet.h"
#include "../../storage/controller/volume_ctl.h"
#include "get_version.h"
#include "mem_cache.h"
#include "job_dock.h"
#include "../../ynet/sock/sock_tcp.h"
#include "../../ynet/net/net_events.h"
#include "lichstor.h"
#include "analysis.h"
#include "types.h"
#include "sock_unix.h"
#include "dbg.h"

#define __TYPE_UNIX__ 1
#define __TYPE_TCP__ 2

typedef struct nbd_session {
        fileid_t id;
        sockid_t sockid;
        int sd;
        int type;
        uint32_t addr;
        mcache_entry_t *entry;
        char pool[MAX_NAME_LEN];
        char export_name[MAX_NAME_LEN];
        uint32_t cflags;
        time_t last_check;
        uint64_t exportsize;
        uint32_t ref;
        char erase;
        char localized;
} nbd_session_t;

static int glob_flags=0;
static int __tcp_listen_sd__;
static int __unix_listen_sd__;
static struct sockaddr_un __unix_listen_addr__;

const uint64_t cliserv_magic = 0x00420281861253LL;
const uint64_t opts_magic = 0x49484156454F5054LL;
const uint64_t rep_magic = 0x3e889045565a9LL;
#define INIT_PASSWD "NBDMAGIC"

static void __nbd_get(nbd_session_t *se);
static void __nbd_put(nbd_session_t *se);

static int __nbd_errno(int errcode) {
        switch (errcode) {
        case EPERM:
                return htonl(1);
        case EIO:
                return htonl(5);
        case ENOMEM:
                return htonl(12);
        case EINVAL:
                return htonl(22);
        case EFBIG:
        case ENOSPC:
                return htonl(28); // ENOSPC
        default:
                return htonl(22); // EINVAL
        }
}

STATIC int __nbd_get_entry(const volid_t *volid, mcache_entry_t **_entry)
{
        int ret;
        mcache_entry_t *cent;

        ret = volume_ctl_get(volid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_ref(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_ctl_release(cent);

        *_entry = cent;
        
        return 0;
err_release:
        volume_ctl_release(cent);
err_ret:
        return ret;
}

static int nbd_session_alloc(nbd_session_t **_se, int sd, int type, uint32_t addr)
{
        int ret;
        void *ptr;
        nbd_session_t *nse;

        ret = ymalloc((void **)&ptr, sizeof(nbd_session_t));
        if(ret)
                GOTO(err_ret, ret);

        nse = (nbd_session_t *)ptr;
        nse->type = type;
        nse->sd = sd;
        nse->addr = addr;
        nse->ref = 0;
        nse->erase = 0;

        *_se = nse;

        return 0;
err_ret:
        return ret;
}

static int __nbd_send_reply(uint32_t opt, int fd, uint32_t reply_type, size_t datasize, void *data)
{
        int ret;
        ssize_t total;
        uint64_t magic = htonll(0x3e889045565a9LL);
        uint32_t datsize = htonl(datasize);
        reply_type = htonl(reply_type);
        opt = htonl(opt);
        struct iovec v_data[] = {
                { &magic, sizeof(magic) },
                { &opt, sizeof(opt) },
                { &reply_type, sizeof(reply_type) },
                { &datsize, sizeof(datsize) },
                { data, datasize },
        };

        total = sizeof(magic) + sizeof(opt) + sizeof(reply_type) + sizeof(datsize) + datasize;

        ret = writev(fd, v_data, 5);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        if(ret != total) {
                ret = ECONNRESET;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __send_export_info(int fd, uint64_t size, uint32_t cflag)
{
        int ret;
        uint64_t size_host = htonll((uint64_t)(size));
        uint16_t flags = 0;

#if 0
                | NBD_FLAG_SEND_FLUSH
                | NBD_FLAG_SEND_FUA
                | NBD_FLAG_ROTATIONAL
                | NBD_FLAG_SEND_TRIM;
#endif

        ret = write(fd, &size_host, 8);
        if (ret < 8) {
                ret = ECONNRESET;
                GOTO(err_ret, ret);
        }

        flags |= NBD_FLAG_HAS_FLAGS;

        flags = htons(flags);
        ret = write(fd, &flags, sizeof(flags));
        if(ret < (int)sizeof(flags)) {
                ret = ECONNRESET;
                GOTO(err_ret, ret);
        }

        if (!(cflag & NBD_FLAG_C_NO_ZEROES)) {
                DBUG("send zero\n");
                char zeros[128];
                memset(zeros, 0x0, sizeof(zeros));
                ret = write(fd, zeros, 124);
                if (ret < 0) {
                        ret = ECONNRESET;
                        GOTO(err_ret, ret);
                }
        } else {
                DBUG("no zero\n");
        }

        return 0;
err_ret:
        return ret;
}

static int  __nbd_export_name(uint32_t opt, int fd, nbd_session_t *nse, uint32_t cflags)
{
        int ret, retry = 0;
        uint32_t namelen;
        char name[MAX_NAME_LEN], pool[MAX_NAME_LEN], path2[MAX_PATH_LEN];
        struct stat stbuf;
        fileid_t id;

        (void) opt;

        ret = read(fd, &namelen, sizeof(namelen));
        if (ret < 0) {
                DWARN("Negotiation failed/7: \n");
                ret = errno;
                GOTO(err_ret, ret);
        }

        namelen = ntohl(namelen);
        if(namelen + 1 > MAX_NAME_LEN){
                DWARN("Negotiation failed/8 namelen is too long\n");
                ret = EIO;
                GOTO(err_ret, ret);
        }

        DBUG("namelen %u\n", namelen);

        name[namelen] = 0;
        ret = read(fd, name, namelen);
        if (ret < 0) {
                DWARN("Negotiation failed/9 \n");
                ret = errno;
                GOTO(err_ret, ret);
        }

        //snprintf(path, MAX_NAME_LEN, "/%s/%s", gloconf.nbd_root, name);
        DINFO("the client export name is %s\n", name);
        path_split(name, pool, path2);

retry:
        ret = stor_lookup1(pool, path2, &id);
        if(ret) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        ret = stor_getattr(pool, &id, &stbuf);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DWARN("%s deleted\n", name);
                        GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

        nse->id = id;
        nse->exportsize = stbuf.st_size;
        nse->cflags = cflags;
        nse->entry = NULL;
        memcpy(nse->pool, pool, strlen(pool) + 1);
        memcpy(nse->export_name, path2, strlen(path2) + 1);
        DINFO("nbd export name %s\n", nse->export_name);

#if 0
        ret = stor_localize(&id);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#else
        nse->localized = 0;
#endif

#if 0
retry1:
        ret = __nbd_get_entry(&id, &nse->entry);
        if (unlikely(ret)) {
                if (ret == EREMCHG) {
                        DWARN("%s not localized\n", nse->export_name);
                } else {
                        if (ret == EAGAIN) {
                                USLEEP_RETRY(err_ret, ret, retry1, retry, 50, (100 * 1000));
                        } else
                                GOTO(err_ret, ret);
                }
        }
#endif

        ret = __send_export_info(fd, nse->exportsize, cflags);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        DWARN("reply error to %d\n", fd);
        return ret;
}

inline static int __nbd_list__(int fd, const char *name)
{
        int ret;
        uint64_t magic, name_len;
        uint32_t opt, type, len;

        name_len = strlen(name);
        magic = cpu_to_be64(rep_magic);
        ret = _write(fd, &magic, sizeof(magic));
        if (ret != sizeof(magic)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        opt = cpu_to_be32(NBD_OPT_LIST);
        ret = _write(fd, &opt, sizeof(opt));
        if (ret != sizeof(opt)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        type = cpu_to_be32(NBD_REP_SERVER);
        ret =_write(fd, &type, sizeof(type));
        if (ret != sizeof(type)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        len = cpu_to_be32(name_len + sizeof(len));
        ret = _write(fd, &len, sizeof(len));
        if (ret != sizeof(len)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        len = cpu_to_be32(name_len);
        ret = _write(fd, &len, sizeof(len));
        if (ret != sizeof(len)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = _write(fd, name, name_len);
        if (ret != (int)name_len) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int  __nbd_negotiate(int fd, nbd_session_t *se)
{
        int ret;
        //uint32_t flags = NBD_FLAG_HAS_FLAGS;
        uint16_t smallflags = NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES;
        //uint16_t smallflags = 0;
        uint64_t magic;
        uint32_t cflags = 0;
        uint32_t opt;
        char tmp[MAX_BUF_LEN];

        ret = write(fd, INIT_PASSWD, 8);
        if (ret < 0){
                DWARN("Negotiation failed/1: \n");
                ret = errno;
                GOTO(err_ret, ret);        
        }

        magic = htonll(opts_magic);
        ret = write(fd, &magic, sizeof(magic));
        if (ret < 0){
                DWARN("Negotiation failed/2: \n");
                ret = errno;
                GOTO(err_ret, ret);        
        }

        smallflags = htons(smallflags);

        ret = write(fd, &smallflags, sizeof(smallflags));
        if (ret < 0){
                DWARN("Negotiation failed/3: \n");
                ret = errno;
                GOTO(err_ret, ret);
        }

        ret = read(fd, &cflags, sizeof(cflags));
        if (ret < 0) {
                ret = errno;
                DWARN("Negotiation failed/4: \n");
                GOTO(err_ret, ret);
        }

        cflags = htonl(cflags);
        if (cflags & NBD_FLAG_C_NO_ZEROES) {
                DBUG("no zero\n");
                glob_flags |= F_NO_ZEROES;
        } else {
                DBUG("send zero\n");
        }

        if (cflags & NBD_FLAG_C_FIXED_NEWSTYLE) {
                DINFO("fixed new style\n");
        } else {
                DINFO("no fixed new style\n");
        }

        do {
                ret = read(fd, &magic, sizeof(magic));
                if (ret < 0){
                        DWARN("Negotiation failed/5: \n");
                        ret = errno;

                        GOTO(err_ret, ret);
                }

                magic = ntohll(magic);
                if(magic != opts_magic) {
                        DWARN("Negotiation failed/5a: magic mismatch\n");
                        ret = EIO;
                        GOTO(err_ret, ret);        
                }

                ret = read(fd, &opt, sizeof(opt));
                if (ret < 0){
                        DWARN("Negotiation failed/6: %m");
                        ret = errno;
                        GOTO(err_ret, ret);
                }

                opt = ntohl(opt);
                switch(opt) {
                case NBD_OPT_EXPORT_NAME:
                        // NBD_OPT_EXPORT_NAME must be the last
                        // selected option, so return from here
                        // if that is chosen.
                        return __nbd_export_name(opt, fd, se, cflags);
                        break;
                case NBD_OPT_LIST:
                        ret = read(fd, tmp, MAX_BUF_LEN);
                        if (ret < 0){
                                ret = errno;
                                GOTO(err_ret, ret);
                        }

                        __nbd_send_reply(opt, fd, NBD_REP_ERR_UNSUP, 0, NULL);
                        /*handle_list(opt, fd, se, cflags); */
                        break;
                case NBD_OPT_ABORT:
                        // handled below
                        break;
                default:
                        __nbd_send_reply(opt, fd, NBD_REP_ERR_UNSUP, 0, NULL);
                        ret = EINVAL;
                        GOTO(err_ret, ret);
                        break;
                }
        }

        while((opt != NBD_OPT_EXPORT_NAME) && (opt != NBD_OPT_ABORT));
        if(opt == NBD_OPT_ABORT) {
                DWARN("Session terminated by client\n");
                ret = EIO;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

#if 0
static int __nbd_pack_len(void *buf, uint32_t len, int *msg_len, int *io_len)
{
        //uint32_t *length, _len, credlen, verilen, headlen;
        const nbd_request_t *req;
        uint32_t type, command;

        if (len < sizeof(nbd_request_t)) {
                DERROR("less then sunrpc_head_t\n");
                return 0;
        }

        req = buf;
        type = ntohl(req->type);
        command = type & NBD_CMD_MASK_COMMAND;
        if (command == NBD_CMD_WRITE) {
                *msg_len =  sizeof(nbd_request_t);
                *io_len = ntohl(req->size);
        } else {
                *msg_len =  sizeof(nbd_request_t);
                *io_len = 0;
        }

        DBUG("__msg__ %u %u\n", *msg_len, *io_len);

        return 0;
}
#endif

static int __nbd_reply_prep(buffer_t *buf, int retval, const nbd_request_t *req)
{
        int ret;
        nbd_reply_t *rep;

        ret = mbuffer_init(buf, sizeof(*rep));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rep = mbuffer_head(buf);
        memcpy(rep->handle, req->handle, sizeof(req->handle));
        rep->magic = htonl(NBD_REPLY_MAGIC);
        if (retval == 0)
                rep->error = 0;
        else
                rep->error = __nbd_errno(retval);

        return 0;
err_ret:
        return ret;
}

static int __nbd_reply(const sockid_t *sockid, int retval, const nbd_request_t *req, buffer_t *_buf)
{
        int ret;
        buffer_t buf;

        ret = __nbd_reply_prep(&buf, retval, req);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (_buf) {
                mbuffer_merge(&buf, _buf);
        }

        corenet_tcp_send(sockid, &buf, 0);

        DBUG("__reply__\n");

        return 0;
err_ret:
        return ret;
}

static int __nbd_io_check()
{
        UNIMPLEMENTED(__WARN__);

        return 0;
}

static int __nbd_write(nbd_session_t *ctx, const nbd_request_t *req, buffer_t *buf)
{
        int ret, retry = 0;
        const fileid_t *fileid;
        time_t begin = gettime();

        schedule_task_setname("nbd_write");

        fileid = &ctx->id;
        DBUG(CHKID_FORMAT" offset %llu size %u\n", CHKID_ARG(fileid),
             (LLU)req->offset, req->size);

        if (unlikely(ctx->localized == 0)) {
                DINFO(CHKID_FORMAT" offset %llu size %u\n", CHKID_ARG(fileid),
                      (LLU)req->offset, req->size);

                ret = stor_localize(ctx->pool, fileid);
                if (ret) {
                        if (ret == ENOSPC) {
                                DWARN(CHKID_FORMAT" offset %llu size %u\n", CHKID_ARG(fileid),
                                      (LLU)req->offset, req->size);
                                //nothing todo;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }

                ctx->localized = 1;
        }

retry:
        if (unlikely(retry)) {
                ret = __nbd_io_check();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        YASSERT(ctx->entry == NULL);
        if (unlikely(ctx->entry && 0)) {
                io_t io;

                io_init(&io, &ctx->id, NULL, req->offset, req->size, 0);
                ret = volume_ctl_write_direct(ctx->entry, &io, buf, 1);
        } else {
                ret = stor_write(ctx->pool, fileid, buf, req->size, req->offset);
        }
        if (unlikely(ret)) {
                if ((ret == EREMCHG  || ret == ESTALE) && ctx->entry) {
                        if (ctx->entry) {
                                mcache_release(ctx->entry);
                                ctx->entry = NULL;
                        }
                        DWARN(CHKID_FORMAT" moved\n", CHKID_ARG(fileid));
                        goto retry;
                }

                ret = _errno(ret);
                if (retry < 100 && (ret == EAGAIN || ret == ENOSPC)
                    && ((gettime() - begin) < 180)) {
                        if (retry > 10) {
                                DINFO("write "CHKID_FORMAT" (%llu, %llu),"
                                      " ret (%d) %s, need retry %u\n",
                                      CHKID_ARG(fileid),
                                      (LLU)req->offset, (LLU)req->size,
                                      ret, strerror(ret), retry);
                        }

                        retry++;
                        ret = schedule_sleep("nbd_write", 1000 * 1000);
                        if (ret) {
                                DWARN("read cmd(%llu, %llu), ret (%d) %s\n",
                                      (LLU)req->offset, (LLU)req->size,
                                      ret, strerror(ret));
                        }
                        goto retry;
                } else {
                        DWARN("write cmd(%llu, %llu), ret (%d) %s\n",
                              (LLU)req->offset, (LLU)req->size,
                              ret, strerror(ret));
                        SERROR(0, "%s, write cmd(%llu, %llu), ret (%d) %s\n",
                                        M_PROTO_NBD_ERROR, (LLU)req->offset, (LLU)req->size,
                                        ret, strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        DBUG(CHKID_FORMAT" offset %llu size %u finish\n", CHKID_ARG(fileid),
             (LLU)req->offset, req->size);

        YASSERT(buf->len == req->size);
        __nbd_reply(&ctx->sockid, 0, req, NULL);

        return 0;
err_ret:
        __nbd_reply(&ctx->sockid, ret, req, NULL);
        return ret;
}

static int __nbd_read(nbd_session_t *ctx, const nbd_request_t *req)
{
        int ret, retry = 0;
        const fileid_t *fileid;
        buffer_t buf;
        time_t begin = gettime();

        schedule_task_setname("nbd_read");

retry:
        if (unlikely(retry)) {
                ret = __nbd_io_check();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        fileid = &ctx->id;

        DBUG(CHKID_FORMAT" offset %llu size %u\n", CHKID_ARG(fileid),
             (LLU)req->offset, req->size);

        if (unlikely(ctx->localized == 0 && req->offset > (1024 * 1024))) {
                DINFO("localize "CHKID_FORMAT" offset %llu size %u\n", CHKID_ARG(fileid),
                      (LLU)req->offset, req->size);

                ret = stor_localize(ctx->pool, fileid);
                if (ret) {
                        if (ret == ENOSPC) {
                                DWARN(CHKID_FORMAT" offset %llu size %u\n", CHKID_ARG(fileid),
                                      (LLU)req->offset, req->size);
                                //nothing todo;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }

                ctx->localized = 1;
        }

        YASSERT(ctx->entry == NULL);
        mbuffer_init(&buf, 0);
        if (unlikely(ctx->entry && 0)) {
                io_t io;

                io_init(&io, &ctx->id, NULL, req->offset, req->size, 0);
                ret = volume_ctl_read_direct(ctx->entry, &io, &buf, 1);
        } else {
                ret = stor_read(ctx->pool, fileid, &buf, req->size, req->offset);
        }
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ESTALE) && ctx->entry) {
                        if (ctx->entry) {
                                mcache_release(ctx->entry);
                                ctx->entry = NULL;
                        }
                        DWARN(CHKID_FORMAT" moved\n", CHKID_ARG(fileid));
                        goto retry;
                }

                ret = _errno(ret);
                if (retry < 100 && (ret == EAGAIN || ret == ENOSPC)
                    && ((gettime() - begin) < 180)) {
                        if (retry > 10) {
                                DINFO("read "CHKID_FORMAT" (%llu, %llu),"
                                      " ret (%d) %s, need retry %u\n",
                                      CHKID_ARG(fileid),
                                      (LLU)req->offset, (LLU)req->size,
                                      ret, strerror(ret), retry);
                        }

                        retry++;
                        ret = schedule_sleep("nbd_read", 1000 * 1000);
                        if (ret) {
                                DWARN("read cmd(%llu, %llu), ret (%d) %s\n",
                                      (LLU)req->offset, (LLU)req->size,
                                      ret, strerror(ret));
                        }
                        goto retry;
                } else {
                        DWARN("read cmd(%llu, %llu), ret (%d) %s\n",
                              (LLU)req->offset, (LLU)req->size,
                              ret, strerror(ret));
                        SERROR(0, "%s, read cmd(%llu, %llu), ret (%d) %s\n",
                                        M_PROTO_NBD_ERROR, (LLU)req->offset, (LLU)req->size,
                                        ret, strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        DBUG(CHKID_FORMAT" offset %llu size %u finish\n", CHKID_ARG(fileid),
             (LLU)req->offset, req->size);

        __nbd_reply(&ctx->sockid, 0, req, &buf);

        return 0;
err_ret:
        __nbd_reply(&ctx->sockid, ret, req, NULL);

        return ret;
}

static void __nbd_func(void *arg)
{
        __nbd_request_t *__nbd_request;
        uint16_t command;

        __nbd_request = arg;
        command = __nbd_request->command;

        __nbd_get(__nbd_request->ctx);

        if (command == NBD_CMD_WRITE) {
                __nbd_write(__nbd_request->ctx, &__nbd_request->req, &__nbd_request->buf);
        } else if (command == NBD_CMD_READ) {
                __nbd_read(__nbd_request->ctx, &__nbd_request->req);
        } else {
                UNIMPLEMENTED(__DUMP__);
        }

        __nbd_put(__nbd_request->ctx);

        mbuffer_free(&__nbd_request->buf);
        mem_cache_free(MEM_CACHE_128, __nbd_request);
}

static int __nbd_newtask(void *ctx, void *buf, int *_count)
{
        int ret, count = 0;
        nbd_request_t req, tmp;
        uint16_t command;
        __nbd_request_t *__nbd_request;
        buffer_t *_buf = buf;

        DBUG("__request__\n");

        while (_buf->len >= sizeof(req)) {
                mbuffer_get(_buf, &req, sizeof(req));

                req.offset = ntohll(req.offset);
                req.type = ntohl(req.type);
                req.size = ntohl(req.size);
                command = req.type & NBD_CMD_MASK_COMMAND;

#ifdef HAVE_STATIC_ASSERT
                static_assert(sizeof(*__nbd_request)  < sizeof(mem_cache128_t), "nbd_request_t");
#endif

                __nbd_request = mem_cache_calloc(MEM_CACHE_128, 0);
                YASSERT(__nbd_request);

                __nbd_request->command = command;
                __nbd_request->req = req;
                mbuffer_init(&__nbd_request->buf, 0);

                switch (command) {
                case NBD_CMD_WRITE:
                        if (req.size + sizeof(req) > _buf->len) {
                                mem_cache_free(MEM_CACHE_128, __nbd_request);
                                DBUG("cmd %u size %u len %u\n", command, req.size, _buf->len);
                                goto out;
                        }

                        ret = mbuffer_popmsg(_buf, &tmp, sizeof(tmp));
                        YASSERT(ret == 0);

                        mbuffer_pop(buf, &__nbd_request->buf, req.size);

                        __nbd_request->ctx = ctx;
                        schedule_task_new("nbd_write", __nbd_func, __nbd_request, -1);

                        break;
                case NBD_CMD_READ:
                        ret = mbuffer_popmsg(_buf, &tmp, sizeof(tmp));
                        YASSERT(ret == 0);

                        __nbd_request->ctx = ctx;
                        schedule_task_new("nbd_read", __nbd_func, __nbd_request, -1);

                        break;
                case NBD_CMD_DISC:
                        DINFO("close conn\n");
                        ret = ECONNRESET;
                        GOTO(err_free, ret);

                        break;
                default:
                        DERROR("bad msgtype %u\n", req.type);
                        YASSERT(0);
                }

                count++;
        }

        if (count) {
                DBUG("new task %u\n", count);
        }

out:
        *_count = count;

        return 0;
err_free:
        mem_cache_free(MEM_CACHE_128, __nbd_request);
//err_ret:
        return ret;
}

static void __nbd_free(nbd_session_t *se)
{
        YASSERT(se->entry == NULL);
        if (se->entry) {
                mcache_release(se->entry);
                se->entry = NULL;
        }

        DINFO("session %s/"CHKID_FORMAT" free\n", se->export_name,
              CHKID_ARG(&se->id));

        yfree((void **)&se);
}

static void __nbd_get(nbd_session_t *se)
{
        se->ref++;
}

static void __nbd_put(nbd_session_t *se)
{
        if (--se->ref == 0 && se->erase) {
                __nbd_free(se);
        }
}

static void __nbd_close(void *arg)
{
        nbd_session_t *se = arg;

        DINFO("session %s/"CHKID_FORMAT" closed\n", se->export_name,
              CHKID_ARG(&se->id));

        se->erase = 1;

        if (se->ref) {
                /*
                 * Must wait for all the jobs handled by thread finish, can't free the
                 * connection before this.
                 * so, we can restart lichd, also can retry here wait the jobs finish,
                 * but the next connection come in must wait for this session closed,
                 * or maybe writefailure will happend.(lenovo network disk system)
                 */
                DWARN("wanting thread's job done: nr(%u)\n", se->ref);
                EXIT(EAGAIN);
        } else {
                DINFO("there is no busy cmd to wait, free now\n");

                __nbd_free(se);
                return;
        }
}

#if 0
static int __nbd_srv_check()
{
        int ret;
        vm_t *vm = vm_self();
        nbd_session_t *session = vm->ctx;
        time_t now;
        mcache_entry_t *entry;

        YASSERT(session->entry == NULL);
        return 0;

        now = gettime();
        if (now - session->last_check < 60 * 10) {
                return 0;
        }

        session->last_check = now;

        if (session->entry == NULL) {
                DINFO("check %s localize\n", session->export_name);

                ret = __nbd_get_entry(&session->id, &entry);
                if (unlikely(ret)) {
                        if (ret == EREMCHG) {
                                DWARN("%s not localized\n", session->export_name);
                        } else {
                                DWARN("%s check fail %u %s\n", session->export_name, ret, strerror(ret));
                        }
                } else {
                        session->entry = entry;
                        DINFO("%s localized\n", session->export_name);
                }
        } else {
                DINFO("recheck %s localize\n", session->export_name);

                ret = __nbd_get_entry(&session->id, &entry);
                if (unlikely(ret)) {
                        if (ret == EREMCHG) {
                                DWARN("release %s localize\n", session->export_name);
                                if (session->entry) {
                                        mcache_release(session->entry);
                                        session->entry = NULL;
                                }
                        } else {
                                DWARN("%s check fail %u %s\n", session->export_name, ret, strerror(ret));
                        }
                } else {
                        YASSERT(session->entry == entry);
                        mcache_release(entry);
                        DINFO("check %s localize ok\n", session->export_name);
                }
        }

        return 0;
}
#endif

static int  __nbd_srv_attach(nbd_session_t *se)
{
        int ret, hash;
        fileid_t fileid;

        ret = stor_lookup1(se->pool, se->export_name, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        hash = core_hash(&fileid);

        se->sockid.sd = se->sd;
        se->sockid.seq = _random();
        se->sockid.type = SOCKID_CORENET;
        se->sockid.addr = se->addr;

        ret = core_attach(hash, &se->sockid, "nbd_srv", se,
                          __nbd_newtask, __nbd_close, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}        

static void *__nbd_accept__(void *arg)
{
        int ret;
        nbd_session_t *se = arg;

        DBUG("negotiation with %d\n", se->sd);

        ret = __nbd_negotiate(se->sd, se);
        if (unlikely(ret)) {
                DINFO("close %d\n", se->sd);
                close(se->sd);
                yfree((void **)&se);
        } else {
                if (se->type == __TYPE_UNIX__) {
                        ret = sock_unix_tuning(se->sd);
                        if (unlikely(ret))
                                GOTO(err_fd, ret);
                } else {
                        ret = tcp_sock_tuning(se->sd, 1, 1);
                        if (unlikely(ret))
                                GOTO(err_fd, ret);
                }

                ret = __nbd_srv_attach(se);
                if (unlikely(ret))
                        GOTO(err_fd, ret);
        }

        return 0;
err_fd:
        close(se->sd);
        yfree((void **)&se);
        return NULL;
}

static int __nbd_tcp_accept()
{
        int ret, sd;
        nbd_session_t *se;
        pthread_t th;
        pthread_attr_t ta;
        struct sockaddr_in sin;
        socklen_t alen;

        _memset(&sin, 0, sizeof(sin));
        alen = sizeof(struct sockaddr_in);

        sd = accept(__tcp_listen_sd__, &sin, &alen);
        if (sd < 0 ) {
                ret = errno;
               GOTO(err_ret, ret);
        }

        ret = nbd_session_alloc(&se, sd, __TYPE_TCP__, sin.sin_addr.s_addr);
        if (unlikely(ret)) {
                GOTO(err_fd, ret);
        }

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __nbd_accept__, se);
        if (unlikely(ret))
                GOTO(err_free, ret);

        return 0;
err_free:
        yfree((void **)&se);
err_fd:
        close(sd);
err_ret:
        return ret;
}

static void *__nbd_tcp_passive__(void *_arg)
{
        int ret;

        (void) _arg;
        DINFO("start...\n");

        while (1) {
                ret = sock_poll_sd(__tcp_listen_sd__, 1000 * 1000, POLLIN);
                if (unlikely(ret)) {
                        if (ret == ETIMEDOUT || ret == ETIME)
                                continue;
                        else
                                GOTO(err_ret, ret);
                 }

                DINFO("tcp event\n");
                
                ret = __nbd_tcp_accept();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
        return NULL;
}

static int __nbd_tcp_passive()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;
        char port[MAX_NAME_LEN];

        snprintf(port, MAX_NAME_LEN, "%u", gloconf.nbd_port);
        ret = tcp_sock_hostlisten(&__tcp_listen_sd__, NULL, port,
                                  YNET_QLEN, YNET_RPC_BLOCK, 1);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __nbd_tcp_passive__, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __nbd_unix_accept()
{
        int ret, sd;
        nbd_session_t *se;
        socklen_t alen;
        pthread_t th;
        pthread_attr_t ta;
        struct sockaddr_un sin;

        _memset(&sin, 0, sizeof(sin));
        alen = sizeof(struct sockaddr_un);

        sd = accept(__unix_listen_sd__, &sin, &alen);
        if (sd < 0 ) {
                ret = errno;
		GOTO(err_ret, ret);
        }

        ret = nbd_session_alloc(&se, sd, __TYPE_UNIX__, 123);
        if (unlikely(ret)) {
                GOTO(err_fd, ret);
        }

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __nbd_accept__, se);
        if (unlikely(ret))
                GOTO(err_free, ret);

        return 0;
err_free:
        yfree((void **)&se);
err_fd:
        close(sd);
err_ret:
        return ret;
}

static void *__nbd_unix_passive__(void *_arg)
{
        int ret;

        (void) _arg;

        while (1) {
                ret = sock_poll_sd(__unix_listen_sd__, 1000 * 1000, POLLIN);
                if (unlikely(ret)) {
                        if (ret == ETIMEDOUT || ret == ETIME)
                                continue;
                        else
                                GOTO(err_ret, ret);
                 }

                DINFO("unix event\n");
                
                ret = __nbd_unix_accept();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
        return NULL;
}

static int __nbd_unix_passive()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        ret = sock_unix_listen(gloconf.nbd_sock, &__unix_listen_sd__, &__unix_listen_addr__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("listen @ %s, sd %u\n", gloconf.nbd_sock, __unix_listen_sd__);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __nbd_unix_passive__, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void *__nbd_service(void *_arg)
{
        int ret;

        (void) _arg;
        DINFO("nbd start...\n");

        while (1) {
                ret = __nbd_tcp_passive(0);
                if (unlikely(ret)) {
                        if (ret == EADDRINUSE) {
                                //DBUG("retry nbd\n");
                                sleep(2);
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                ret = __nbd_unix_passive(0);
                if (unlikely(ret)) {
                        if (ret == EADDRINUSE) {
                                //DBUG("retry nbd\n");
                                sleep(2);
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }
                
                DINFO("nbd service inited\n");

                break;
        }

        return NULL;
err_ret:
        return NULL;
}

int nbd_srv()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __nbd_service, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);
       
        return 0;
err_ret:
        return ret;
}
